package main

import (
	"fmt"
	mqtt "github.com/eclipse/paho.mqtt.golang"
	"os"
	"strconv"
	"time"
)


var f *os.File
var ready chan struct{} // 准备开始

var f2 mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
	parseInt, err := strconv.ParseInt(string(msg.Payload()), 0, 0)
	if err != nil {
		fmt.Println(err)
		return
	}

	s := "纳秒: " +  strconv.FormatInt(time.Now().UnixNano() - parseInt, 10) + "\n"

	fmt.Println(s)

	f.Write([]byte(s))
}

func sub()  {
	staticId := "hz_"
	for i := 0; i < 1000;i++ {
		opts := mqtt.NewClientOptions().AddBroker("tcp://127.0.0.1:1883").SetKeepAlive(time.Second * 120)
		opts.SetClientID(staticId + strconv.Itoa(i))

		c := mqtt.NewClient(opts)
		token := c.Connect()
		if token.Wait() && token.Error() != nil {
			panic(token.Error())
		}

		if token = c.Subscribe("/dev/1/2/3/4/5/6/7/8/9/10/" + strconv.Itoa(i), 0, f2); token.Wait() && token.Error() != nil {
			fmt.Println(token.Error())
			return
		}

		time.Sleep(time.Millisecond)
	}

	ready <- struct{}{}
}

func pub()  {
	<- ready
	fmt.Println("开始模拟生产")

	for {
		time.Sleep(time.Second)

		opts := mqtt.NewClientOptions().AddBroker("tcp://127.0.0.1:1883")
		opts.SetClientID("pub_once")

		c := mqtt.NewClient(opts)
		token := c.Connect()
		if token.Wait() && token.Error() != nil {
			panic(token.Error())
		}

		b := []byte(strconv.FormatInt(time.Now().UnixNano(), 10))

		token2 := c.Publish("/dev/1/2/3/4/5/6/7/8/9/10/10", 0, false, b)
		token2.Wait()
	}
}

func main() {
	f, _ = os.OpenFile("sb.log", os.O_CREATE|os.O_APPEND|os.O_RDWR, 0660)
	go func() {
		for {
			time.Sleep(time.Second)
			f.Sync()
		}
	}()

	ready = make(chan struct{}, 1)

	go sub()
	go pub()

	select {
	}
}
